Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing #9983

Closed
wants to merge 4 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Nov 26, 2015

The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, <checkpoint dir>/_partitioner. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible.

@tdas
Copy link
Contributor Author

tdas commented Nov 26, 2015

@zsxwing @andrewor14 Can you take a look at this.

@@ -55,25 +55,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v
* This is called immediately after the first action invoked on this RDD has completed.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this code has been moved in the ReliableCheckpointRDD.createCheckpointedRDD

/**
* Write a RDD partition's data to a checkpoint file.
*/
def writePartitionToCheckpointFile[T: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

@andrewor14
Copy link
Contributor

LGTM. Just style and naming nits.

@SparkQA
Copy link

SparkQA commented Nov 26, 2015

Test build #46724 has finished for PR 9983 at commit 4dfa265.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Dec 1, 2015

jenkins test this please

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #2132 has finished for PR 9983 at commit 4dfa265.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46931 has finished for PR 9983 at commit bf7bebf.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46935 has finished for PR 9983 at commit 9eb7250.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Dec 1, 2015

Test build #46969 has finished for PR 9983 at commit 9eb7250.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

m1.6

asfgit pushed a commit that referenced this pull request Dec 1, 2015
The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `<checkpoint dir>/_partitioner`.  In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible.

Author: Tathagata Das <[email protected]>

Closes #9983 from tdas/SPARK-12004.

(cherry picked from commit 60b541e)
Signed-off-by: Andrew Or <[email protected]>
@asfgit asfgit closed this in 60b541e Dec 1, 2015
asfgit pushed a commit that referenced this pull request Dec 7, 2015
…ner not present

The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).

While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected.

Author: Tathagata Das <[email protected]>

Closes #9988 from tdas/SPARK-11932.
asfgit pushed a commit that referenced this pull request Dec 7, 2015
…ner not present

The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004).

While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected.

Author: Tathagata Das <[email protected]>

Closes #9988 from tdas/SPARK-11932.

(cherry picked from commit 5d80d8c)
Signed-off-by: Tathagata Das <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants